﻿namespace MassTransit.Tests
{
    using System;
    using System.Runtime.Serialization;
    using System.Threading.Tasks;
    using MassTransit.Pipeline;
    using NUnit.Framework;
    using Shouldly;
    using TestFramework;
    using TestFramework.Messages;


    [TestFixture]
    public class Connecting_to_the_publish_observer_bus :
        InMemoryTestFixture
    {
        [Test]
        public async Task Should_invoke_the_exception_after_send_failure()
        {
            var observer = new Observer();
            using (Bus.ConnectPublishObserver(observer))
            {
                Assert.Throws<SerializationException>(async () => await Bus.Publish(new PingMessage(), Pipe.Execute<SendContext>(x => x.Serializer = null)));

                await observer.SendFaulted;
            }
        }

        [Test]
        public async Task Should_invoke_the_observer_after_send()
        {
            var observer = new Observer();
            using (Bus.ConnectPublishObserver(observer))
            {
                await Bus.Publish(new PingMessage());

                await observer.PostSent;
            }
        }

        [Test]
        public async Task Should_invoke_the_observer_prior_to_send()
        {
            var observer = new Observer();
            using (Bus.ConnectPublishObserver(observer))
            {
                await Bus.Publish(new PingMessage());

                await observer.PreSent;
            }
        }

        [Test]
        public async Task Should_not_invoke_post_sent_on_exception()
        {
            var observer = new Observer();
            using (Bus.ConnectPublishObserver(observer))
            {
                Assert.Throws<SerializationException>(async () => await Bus.Publish(new PingMessage(), Pipe.Execute<SendContext>(x => x.Serializer = null)));

                await observer.SendFaulted;

                observer.PostSent.Status.ShouldBe(TaskStatus.WaitingForActivation);
            }
        }


        class Observer :
            IPublishObserver
        {
            readonly TaskCompletionSource<PublishContext> _postSend;
            readonly TaskCompletionSource<PublishContext> _preSend;
            readonly TaskCompletionSource<PublishContext> _sendFaulted;

            public Observer()
            {
                _sendFaulted = new TaskCompletionSource<PublishContext>();
                _preSend = new TaskCompletionSource<PublishContext>();
                _postSend = new TaskCompletionSource<PublishContext>();
            }

            public Task<PublishContext> PreSent
            {
                get { return _preSend.Task; }
            }

            public Task<PublishContext> PostSent
            {
                get { return _postSend.Task; }
            }

            public Task<PublishContext> SendFaulted
            {
                get { return _sendFaulted.Task; }
            }

            public async Task PrePublish<T>(PublishContext<T> context)
                where T : class
            {
                _preSend.TrySetResult(context);
            }

            public async Task PostPublish<T>(PublishContext<T> context)
                where T : class
            {
                _postSend.TrySetResult(context);
            }

            public async Task PublishFault<T>(PublishContext<T> context, Exception exception)
                where T : class
            {
                _sendFaulted.TrySetResult(context);
            }
        }
    }
}